You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/18 00:37:29 UTC
[3/6] storm git commit: Leverage sampling to reduce the overhead of
QueueMetrics
Leverage sampling to reduce the overhead of QueueMetrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/272a9c6b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/272a9c6b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/272a9c6b
Branch: refs/heads/master
Commit: 272a9c6ba73e609e9f503638ef7258021681365c
Parents: d03caab
Author: wangli1426 <wa...@gmail.com>
Authored: Thu Sep 3 22:45:17 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Thu Sep 3 22:45:17 2015 +0800
----------------------------------------------------------------------
.../backtype/storm/utils/DisruptorQueue.java | 34 ++++++++++++++++++--
1 file changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/272a9c6b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 2ed26a5..4448959 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -28,6 +28,7 @@ import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WaitStrategy;
+import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -71,7 +72,7 @@ public class DisruptorQueue implements IStatefulObject {
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
_buffer.setGatingSequences(_consumer);
- _metrics = new QueueMetrics();
+ _metrics = new QueueMetrics((float)0.05);
if (claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
@@ -225,6 +226,27 @@ public class DisruptorQueue implements IStatefulObject {
public class QueueMetrics {
private final RateTracker _rateTracker = new RateTracker(10000, 10);
+ private final float _sampleRate;
+ private Random _random;
+
+ public QueueMetrics() throws IllegalArgumentException {
+ this(1);
+ }
+
+ /**
+ * @param sampleRate a number between 0 and 1. The higher it is, the accurate the metrics
+ * will be. Using a reasonable sampleRate, e.g., 0.1, could effectively reduce the
+ * metric maintenance cost while providing good accuracy.
+ */
+ public QueueMetrics(float sampleRate) throws IllegalArgumentException {
+
+ if (sampleRate <= 0 || sampleRate > 1)
+ throw new IllegalArgumentException("sampleRate should be a value between (0,1].");
+
+ _sampleRate = sampleRate;
+
+ _random = new Random();
+ }
public long writePos() {
return _buffer.getCursor();
@@ -253,7 +275,7 @@ public class DisruptorQueue implements IStatefulObject {
long rp = readPos();
long wp = writePos();
- final float arrivalRateInMils = _rateTracker.reportRate();
+ final float arrivalRateInMils = _rateTracker.reportRate() / _sampleRate;
state.put("capacity", capacity());
state.put("population", wp - rp);
@@ -265,9 +287,15 @@ public class DisruptorQueue implements IStatefulObject {
}
public void notifyArrivals(long counts) {
- _rateTracker.notify(counts);
+ if (sample())
+ _rateTracker.notify(counts);
}
+ final private boolean sample() {
+ if (_sampleRate == 1 || _random.nextFloat() < _sampleRate)
+ return true;
+ return false;
+ }
}
}
\ No newline at end of file