You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/18 00:37:29 UTC

[3/6] storm git commit: Leverage sampling to reduce the overhead of QueueMetrics

Leverage sampling to reduce the overhead of QueueMetrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/272a9c6b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/272a9c6b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/272a9c6b

Branch: refs/heads/master
Commit: 272a9c6ba73e609e9f503638ef7258021681365c
Parents: d03caab
Author: wangli1426 <wa...@gmail.com>
Authored: Thu Sep 3 22:45:17 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Thu Sep 3 22:45:17 2015 +0800

----------------------------------------------------------------------
 .../backtype/storm/utils/DisruptorQueue.java    | 34 ++++++++++++++++++--
 1 file changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/272a9c6b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 2ed26a5..4448959 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -28,6 +28,7 @@ import com.lmax.disruptor.SequenceBarrier;
 import com.lmax.disruptor.SingleThreadedClaimStrategy;
 import com.lmax.disruptor.WaitStrategy;
 
+import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -71,7 +72,7 @@ public class DisruptorQueue implements IStatefulObject {
         _consumer = new Sequence();
         _barrier = _buffer.newBarrier();
         _buffer.setGatingSequences(_consumer);
-        _metrics = new QueueMetrics();
+        _metrics = new QueueMetrics((float)0.05);
 
         if (claim instanceof SingleThreadedClaimStrategy) {
             consumerStartedFlag = true;
@@ -225,6 +226,27 @@ public class DisruptorQueue implements IStatefulObject {
     public class QueueMetrics {
 
         private final RateTracker _rateTracker = new RateTracker(10000, 10);
+        private final float _sampleRate;
+        private Random _random;
+
+        public QueueMetrics() throws IllegalArgumentException {
+            this(1);
+        }
+
+        /**
+         * @param sampleRate a number between 0 and 1. The higher it is, the accurate the metrics
+         *                   will be. Using a reasonable sampleRate, e.g., 0.1, could effectively reduce the
+         *                   metric maintenance cost while providing good accuracy.
+         */
+        public QueueMetrics(float sampleRate) throws IllegalArgumentException {
+
+            if (sampleRate <= 0 || sampleRate > 1)
+                throw new IllegalArgumentException("sampleRate should be a value between (0,1].");
+
+            _sampleRate = sampleRate;
+
+            _random = new Random();
+        }
 
         public long writePos() {
             return _buffer.getCursor();
@@ -253,7 +275,7 @@ public class DisruptorQueue implements IStatefulObject {
             long rp = readPos();
             long wp = writePos();
 
-            final float arrivalRateInMils = _rateTracker.reportRate();
+            final float arrivalRateInMils = _rateTracker.reportRate() / _sampleRate;
 
             state.put("capacity", capacity());
             state.put("population", wp - rp);
@@ -265,9 +287,15 @@ public class DisruptorQueue implements IStatefulObject {
         }
 
         public void notifyArrivals(long counts) {
-            _rateTracker.notify(counts);
+            if (sample())
+                _rateTracker.notify(counts);
         }
 
+        final private boolean sample() {
+            if (_sampleRate == 1 || _random.nextFloat() < _sampleRate)
+                return true;
+            return false;
+        }
     }
 
 }
\ No newline at end of file