You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/09/28 19:17:05 UTC

[2/3] git commit: add RestorableMeter

add RestorableMeter


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3b7669d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3b7669d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3b7669d

Branch: refs/heads/trunk
Commit: c3b7669d003cc18fbba5f8c9bd3258c624f54c4d
Parents: c8915ce
Author: Jonathan Ellis <jb...@apache.org>
Authored: Sat Sep 28 12:16:55 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Sat Sep 28 12:16:55 2013 -0500

----------------------------------------------------------------------
 .../cassandra/metrics/RestorableMeter.java      | 185 +++++++++++++++++++
 1 file changed, 185 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3b7669d/src/java/org/apache/cassandra/metrics/RestorableMeter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/RestorableMeter.java b/src/java/org/apache/cassandra/metrics/RestorableMeter.java
new file mode 100644
index 0000000..7da2ff9
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/RestorableMeter.java
@@ -0,0 +1,185 @@
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.core.Clock;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.lang.Math.exp;
+
+/**
+ * A meter metric which measures mean throughput as well as fifteen-minute and two-hour
+ * exponentially-weighted moving average throughputs.
+ *
+ * This is based heavily on the Meter and EWMA classes from codahale/yammer metrics.
+ *
+ * @see <a href="http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average">EMA</a>
+ */
+public class RestorableMeter
+{
+    private static final long TICK_INTERVAL = TimeUnit.SECONDS.toNanos(5);
+    private static final double NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+
+    private final RestorableEWMA m15Rate;
+    private final RestorableEWMA m120Rate;
+
+    private final AtomicLong count = new AtomicLong();
+    private final long startTime;
+    private final AtomicLong lastTick;
+    private final Clock clock = Clock.defaultClock();
+
+    /**
+     * Creates a new, uninitialized RestorableMeter.
+     */
+    public RestorableMeter() {
+        this.m15Rate = new RestorableEWMA(TimeUnit.MINUTES.toSeconds(15));
+        this.m120Rate = new RestorableEWMA(TimeUnit.MINUTES.toSeconds(120));
+        this.startTime = this.clock.tick();
+        this.lastTick = new AtomicLong(startTime);
+    }
+
+    /**
+     * Restores a RestorableMeter from the last seen 15m and 2h rates.
+     * @param lastM15Rate the last-seen 15m rate, in terms of events per second
+     * @param lastM120Rate the last seen 2h rate, in terms of events per second
+     */
+    public RestorableMeter(double lastM15Rate, double lastM120Rate) {
+        this.m15Rate = new RestorableEWMA(lastM15Rate, TimeUnit.MINUTES.toSeconds(15));
+        this.m120Rate = new RestorableEWMA(lastM120Rate, TimeUnit.MINUTES.toSeconds(120));
+        this.startTime = this.clock.tick();
+        this.lastTick = new AtomicLong(startTime);
+    }
+
+    /**
+     * Updates the moving averages as needed.
+     */
+    private void tickIfNecessary() {
+        final long oldTick = lastTick.get();
+        final long newTick = clock.tick();
+        final long age = newTick - oldTick;
+        if (age > TICK_INTERVAL) {
+            final long newIntervalStartTick = newTick - age % TICK_INTERVAL;
+            if (lastTick.compareAndSet(oldTick, newIntervalStartTick)) {
+                final long requiredTicks = age / TICK_INTERVAL;
+                for (long i = 0; i < requiredTicks; i++) {
+                    m15Rate.tick();
+                    m120Rate.tick();
+                }
+            }
+        }
+    }
+
+    /**
+     * Mark the occurrence of an event.
+     */
+    public void mark() {
+        mark(1);
+    }
+
+    /**
+     * Mark the occurrence of a given number of events.
+     *
+     * @param n the number of events
+     */
+    public void mark(long n) {
+        tickIfNecessary();
+        count.addAndGet(n);
+        m15Rate.update(n);
+        m120Rate.update(n);
+    }
+
+    /**
+     * Returns the 15-minute rate in terms of events per second.  This carries the previous rate when restored.
+     */
+    public double fifteenMinuteRate() {
+        tickIfNecessary();
+        return m15Rate.rate();
+    }
+
+    /**
+     * Returns the two-hour rate in terms of events per second.  This carries the previous rate when restored.
+     */
+    public double twoHourRate() {
+        tickIfNecessary();
+        return m120Rate.rate();
+    }
+
+    /**
+     * The total number of events that have occurred since this object was created.  Note that the previous count
+     * is *not* carried over when a RestorableMeter is restored.
+     */
+    public long count() {
+        return count.get();
+    }
+
+    /**
+     * Returns the mean rate of events per second since this object was created.  Note that the mean rate
+     * does *not* carry over when a RestorableMeter is restored, so the mean rate is only a measure since
+     * this object was created.
+     */
+    public double meanRate() {
+        if (count() == 0) {
+            return 0.0;
+        } else {
+            final long elapsed = (clock.tick() - startTime);
+            return (count() / (double) elapsed) * NANOS_PER_SECOND;
+        }
+    }
+
+    class RestorableEWMA {
+        private volatile boolean initialized = false;
+        private volatile double rate = 0.0; // average rate in terms of events per nanosecond
+
+        private final AtomicLong uncounted = new AtomicLong();
+        private final double alpha, interval;
+
+        /**
+         * Create a new, uninitialized EWMA with a given window.
+         *
+         * @param windowInSeconds the window of time this EWMA should average over, expressed as a number of seconds
+         */
+        public RestorableEWMA(long windowInSeconds) {
+            this.alpha = 1 - exp((-TICK_INTERVAL / NANOS_PER_SECOND) / windowInSeconds);
+            this.interval = (double) TICK_INTERVAL;
+        }
+
+        /**
+         * Restore an EWMA from a last-seen rate and a given window.
+         *
+         * @param intervalInSeconds the window of time this EWMA should average over, expressed as a number of seconds
+         */
+        public RestorableEWMA(double lastRate, long intervalInSeconds) {
+            this(intervalInSeconds);
+            this.rate = lastRate / NANOS_PER_SECOND;
+            this.initialized = true;
+        }
+
+        /**
+         * Update the moving average with a new value.
+         */
+        public void update(long n) {
+            uncounted.addAndGet(n);
+        }
+
+        /**
+         * Mark the passage of time and decay the current rate accordingly.
+         */
+        public void tick() {
+            final long count = uncounted.getAndSet(0);
+            final double instantRate = count / interval;
+            if (initialized) {
+                rate += (alpha * (instantRate - rate));
+            } else {
+                rate = instantRate;
+                initialized = true;
+            }
+        }
+
+        /**
+         * Returns the rate in terms of events per second.
+         */
+        public double rate() {
+            return rate * NANOS_PER_SECOND;
+        }
+    }
+}
\ No newline at end of file