You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/03 15:40:18 UTC

[1/4] storm git commit: Updated RateTracker to be thread safe, more accurate, and very fast. removed subsampling of rate calculation in disruptor, because sub-sampling was more expensive than just doing the calculation.

Repository: storm
Updated Branches:
  refs/heads/master 86ea8b218 -> 7cf4d2596


Updated RateTracker to be thread safe, more accurate, and very fast.  removed subsampling of rate calculation in disruptor, because sub-sampling was more expensive than just doing the calculation.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dbcb1386
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dbcb1386
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dbcb1386

Branch: refs/heads/master
Commit: dbcb138603f55fc9f234d2d3b2404a2bfc92e4d9
Parents: ce93d5f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Oct 1 13:00:51 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Oct 1 14:04:30 2015 -0500

----------------------------------------------------------------------
 .../backtype/storm/utils/DisruptorQueue.java    |  51 ++-----
 .../jvm/backtype/storm/utils/RateTracker.java   | 146 ++++++++++++-------
 .../backtype/storm/utils/RateTrackerTest.java   |  61 +++++---
 3 files changed, 149 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dbcb1386/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index e0053e9..72590e5 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -76,7 +76,7 @@ public class DisruptorQueue implements IStatefulObject {
         _consumer = new Sequence();
         _barrier = _buffer.newBarrier();
         _buffer.setGatingSequences(_consumer);
-        _metrics = new QueueMetrics((float) 0.05);
+        _metrics = new QueueMetrics();
 
         if (claim instanceof SingleThreadedClaimStrategy) {
             consumerStartedFlag = true;
@@ -264,34 +264,11 @@ public class DisruptorQueue implements IStatefulObject {
         return _metrics;
     }
 
-
     /**
      * This inner class provides methods to access the metrics of the disruptor queue.
      */
     public class QueueMetrics {
-
         private final RateTracker _rateTracker = new RateTracker(10000, 10);
-        private final float _sampleRate;
-        private Random _random;
-
-        public QueueMetrics() throws IllegalArgumentException {
-            this(1);
-        }
-
-        /**
-         * @param sampleRate a number between 0 and 1. The higher it is, the accurate the metrics
-         *                   will be. Using a reasonable sampleRate, e.g., 0.1, could effectively reduce the
-         *                   metric maintenance cost while providing good accuracy.
-         */
-        public QueueMetrics(float sampleRate) throws IllegalArgumentException {
-
-            if (sampleRate <= 0 || sampleRate > 1)
-                throw new IllegalArgumentException("sampleRate should be a value between (0,1].");
-
-            _sampleRate = sampleRate;
-
-            _random = new Random();
-        }
 
         public long writePos() {
             return _buffer.getCursor();
@@ -320,35 +297,25 @@ public class DisruptorQueue implements IStatefulObject {
             long rp = readPos();
             long wp = writePos();
 
-            final float arrivalRateInMils = _rateTracker.reportRate() / _sampleRate;
+            final double arrivalRateInSecs = _rateTracker.reportRate();
 
-            /*
-            Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
-            If this assumption does not hold, the calculation of sojourn time should also consider
-            departure rate according to Queuing Theory.
-             */
-            final float sojournTime = (wp - rp) / (float) Math.max(arrivalRateInMils, 0.00001);
+            //Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
+            // If this assumption does not hold, the calculation of sojourn time should also consider
+            // departure rate according to Queuing Theory.
+            final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
 
             state.put("capacity", capacity());
             state.put("population", wp - rp);
             state.put("write_pos", wp);
             state.put("read_pos", rp);
-            state.put("arrival_rate", arrivalRateInMils); //arrivals per millisecond
-            state.put("sojourn_time", sojournTime); //element sojourn time in milliseconds
+            state.put("arrival_rate_secs", arrivalRateInSecs);
+            state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
 
             return state;
         }
 
         public void notifyArrivals(long counts) {
-            if (sample())
-                _rateTracker.notify(counts);
-        }
-
-        final private boolean sample() {
-            if (_sampleRate == 1 || _random.nextFloat() < _sampleRate)
-                return true;
-            return false;
+            _rateTracker.notify(counts);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbcb1386/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
index f937b1f..5105c09 100644
--- a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
+++ b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
@@ -17,53 +17,61 @@
  */
 package backtype.storm.utils;
 
-import java.util.*;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * This class is a utility to track the rate.
+ * This class is a utility to track the rate of something.
  */
 public class RateTracker{
-    /* number of slides to keep in the history. */
-    public final int _numOfSlides; // number of slides to keep in the history
-
-    public final int _slideSizeInMils;
-    private final long[] _histograms;// an array storing the number of element for each time slide.
-
-    private int _currentValidSlideNum;
-
-    private static Timer _timer = new Timer();
+    private final int _bucketSizeMillis;
+    //Old Buckets and their length are only touched when rotating or gathering the metrics, which should not be that frequent
+    // As such all access to them should be protected by synchronizing with the RateTracker instance
+    private final long[] _bucketTime;
+    private final long[] _oldBuckets;
+    
+    private final AtomicLong _bucketStart;
+    private final AtomicLong _currentBucket;
+    
+    private final TimerTask _task;
+    private static Timer _timer = new Timer("rate tracker timer", true);
 
     /**
      * @param validTimeWindowInMils events that happened before validTimeWindowInMils are not considered
      *                        when reporting the rate.
-     * @param numOfSlides the number of time sildes to divide validTimeWindows. The more slides,
+     * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
      *                    the smother the reported results will be.
      */
-    public RateTracker(int validTimeWindowInMils, int numOfSlides) {
-        this(validTimeWindowInMils, numOfSlides, false);
+    public RateTracker(int validTimeWindowInMils, int numBuckets) {
+        this(validTimeWindowInMils, numBuckets, -1);
     }
 
     /**
      * Constructor
      * @param validTimeWindowInMils events that happened before validTimeWindow are not considered
      *                        when reporting the rate.
-     * @param numOfSlides the number of time sildes to divide validTimeWindows. The more slides,
+     * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
      *                    the smother the reported results will be.
-     * @param simulate set true if it use simulated time rather than system time for testing purpose.
+     * @param startTime if positive the simulated time to start the first bucket at.
      */
-    public RateTracker(int validTimeWindowInMils, int numOfSlides, boolean simulate ){
-        _numOfSlides = Math.max(numOfSlides, 1);
-        _slideSizeInMils = validTimeWindowInMils / _numOfSlides;
-        if (_slideSizeInMils < 1 ) {
-            throw new IllegalArgumentException("Illeggal argument for RateTracker");
+    RateTracker(int validTimeWindowInMils, int numBuckets, long startTime){
+        numBuckets = Math.max(numBuckets, 1);
+        _bucketSizeMillis = validTimeWindowInMils / numBuckets;
+        if (_bucketSizeMillis < 1 ) {
+            throw new IllegalArgumentException("validTimeWindowInMilis and numOfSildes cause each slide to have a window that is too small");
         }
-        assert(_slideSizeInMils > 1);
-        _histograms = new long[_numOfSlides];
-        Arrays.fill(_histograms,0L);
-        if(!simulate) {
-            _timer.scheduleAtFixedRate(new Fresher(), _slideSizeInMils, _slideSizeInMils);
+        _bucketTime = new long[numBuckets - 1];
+        _oldBuckets = new long[numBuckets - 1];
+
+        _bucketStart = new AtomicLong(startTime >= 0 ? startTime : System.currentTimeMillis());
+        _currentBucket = new AtomicLong(0);
+        if (startTime < 0) {
+            _task = new Fresher();
+            _timer.scheduleAtFixedRate(_task, _bucketSizeMillis, _bucketSizeMillis);
+        } else {
+            _task = null;
         }
-        _currentValidSlideNum = 1;
     }
 
     /**
@@ -72,48 +80,86 @@ public class RateTracker{
      * @param count number of arrivals
      */
     public void notify(long count) {
-        _histograms[_histograms.length-1]+=count;
+        _currentBucket.addAndGet(count);
     }
 
     /**
-     * Return the average rate in slides.
-     *
-     * @return the average rate
+     * @return the approximate average rate per second.
      */
-    public final float reportRate() {
-        long sum = 0;
-        long duration = _currentValidSlideNum * _slideSizeInMils;
-        for(int i=_numOfSlides - _currentValidSlideNum; i < _numOfSlides; i++ ){
-            sum += _histograms[i];
-        }
-
-        return sum / (float) duration * 1000;
+    public synchronized double reportRate() {
+        return reportRate(System.currentTimeMillis());
     }
 
-    public final void forceUpdateSlides(int numToEclipse) {
-
-        for(int i=0; i< numToEclipse; i++) {
-            updateSlides();
+    synchronized double reportRate(long currentTime) {
+        long duration = Math.max(1l, currentTime - _bucketStart.get());
+        long events = _currentBucket.get();
+        for (int i = 0; i < _oldBuckets.length; i++) {
+            events += _oldBuckets[i];
+            duration += _bucketTime[i];
         }
 
+        return events * 1000.0 / duration;
     }
 
-    private void updateSlides(){
-
-        for (int i = 0; i < _numOfSlides - 1; i++) {
-            _histograms[i] = _histograms[i + 1];
+    public void close() {
+        if (_task != null) {
+            _task.cancel();
         }
+    }
 
-        _histograms[_histograms.length - 1] = 0;
+    /**
+     * Rotate the buckets a set number of times for testing purposes.
+     * @param numToEclipse the number of rotations to perform.
+     */
+    final void forceRotate(int numToEclipse, long interval) {
+        long time = _bucketStart.get();
+        for (int i = 0; i < numToEclipse; i++) {
+            time += interval;
+            rotateBuckets(time);
+        }
+    }
 
-        _currentValidSlideNum = Math.min(_currentValidSlideNum + 1, _numOfSlides);
+    private synchronized void rotateBuckets(long time) {
+        long timeSpent = time - _bucketStart.getAndSet(time); 
+        long currentVal = _currentBucket.getAndSet(0);
+        for (int i = 0; i < _oldBuckets.length; i++) {
+            long tmpTime = _bucketTime[i];
+            _bucketTime[i] = timeSpent;
+            timeSpent = tmpTime;
+
+            long cnt = _oldBuckets[i];
+            _oldBuckets[i] = currentVal;
+            currentVal = cnt;
+        }
     }
 
     private class Fresher extends TimerTask {
         public void run () {
-            updateSlides();
+            rotateBuckets(System.currentTimeMillis());
         }
     }
 
+    public static void main (String args[]) throws Exception {
+        final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 100000000;
+        for (int i = 0; i < 10; i++) {
+            testRate(number);
+        }
+    }
 
+    private static void testRate(int number) {
+        RateTracker rt = new RateTracker(10000, 10);
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < number; i++) {
+            rt.notify(1);
+            if ((i % 1000000) == 0) {
+                //There seems to be an issue with java that when we are under a very heavy load
+                // the timer thread does not get called.  This is a work around for that.
+                Thread.yield();
+            }
+        }
+        long end = System.currentTimeMillis();
+        double rate = rt.reportRate();
+        rt.close();
+        System.out.printf("time %,8d count %,8d rate %,15.2f reported rate %,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbcb1386/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
index d11bbf6..0c50b80 100644
--- a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
@@ -17,9 +17,9 @@
  */
 package backtype.storm.utils;
 
-import org.junit.Assert;
 import org.junit.Test;
 import junit.framework.TestCase;
+import static org.junit.Assert.*;
 
 /**
  * Unit test for RateTracker
@@ -27,36 +27,63 @@ import junit.framework.TestCase;
 public class RateTrackerTest extends TestCase {
 
     @Test
+    public void testExactRate() {
+        final long interval = 1000l;
+        long time = 0l;
+        RateTracker rt = new RateTracker(10000, 10, time);
+        double [] expected = new double[] {10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0};
+        for (int i = 0; i < expected.length; i++) {
+            double exp = expected[i];
+            rt.notify(10);
+            time += interval;
+            double actual = rt.reportRate(time);
+            rt.forceRotate(1, interval);
+            assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
+        }
+        expected = new double[] {11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0};
+        for (int i = 0; i < expected.length; i++) {
+            double exp = expected[i];
+            rt.notify(20);
+            time += interval;
+            double actual = rt.reportRate(time);
+            rt.forceRotate(1, interval);
+            assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
+        }
+    }
+
+
+    @Test
     public void testEclipsedAllWindows() {
-        RateTracker rt = new RateTracker(10000, 10, true);
+        long time = 0;
+        RateTracker rt = new RateTracker(10000, 10, time);
         rt.notify(10);
-        rt.forceUpdateSlides(10);
-        assert (rt.reportRate() == 0);
+        rt.forceRotate(10, 1000l);
+        assertEquals(0.0, rt.reportRate(10000l), 0.00001);
     }
 
     @Test
     public void testEclipsedOneWindow() {
-        RateTracker rt = new RateTracker(10000, 10, true);
+        long time = 0;
+        RateTracker rt = new RateTracker(10000, 10, time);
         rt.notify(1);
-        float r1 = rt.reportRate();
-        rt.forceUpdateSlides(1);
+        double r1 = rt.reportRate(1000l);
+        rt.forceRotate(1, 1000l);
         rt.notify(1);
-        float r2 = rt.reportRate();
-
-        System.out.format("r1:%f, r2:%f\n", r1, r2);
+        double r2 = rt.reportRate(2000l);
 
-        assert (r1 == r2);
+        assertEquals(r1, r2, 0.00001);
     }
 
     @Test
     public void testEclipsedNineWindows() {
-        RateTracker rt = new RateTracker(10000, 10, true);
+        long time = 0;
+        RateTracker rt = new RateTracker(10000, 10, time);
         rt.notify(1);
-        float r1 = rt.reportRate();
-        rt.forceUpdateSlides(9);
+        double r1 = rt.reportRate(1000);
+        rt.forceRotate(9, 1000);
         rt.notify(9);
-        float r2 = rt.reportRate();
+        double r2 = rt.reportRate(10000);
 
-        assert (r1 == r2);
+        assertEquals(r1, r2, 0.00001);
     }
-}
\ No newline at end of file
+}


[3/4] storm git commit: Merge branch 'STORM-1078' of https://github.com/revans2/incubator-storm into STORM-1078

Posted by bo...@apache.org.
Merge branch 'STORM-1078' of https://github.com/revans2/incubator-storm into STORM-1078

STORM-1078: Updated RateTracker to be thread safe


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/83183433
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/83183433
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/83183433

Branch: refs/heads/master
Commit: 831834338266275780b4479b3afce2a162f62974
Parents: 86ea8b2 dad2a81
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Sat Oct 3 08:31:22 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 3 08:31:22 2015 -0500

----------------------------------------------------------------------
 .../backtype/storm/utils/DisruptorQueue.java    |  51 ++-----
 .../jvm/backtype/storm/utils/RateTracker.java   | 147 ++++++++++++-------
 .../backtype/storm/utils/RateTrackerTest.java   |  66 ++++++---
 3 files changed, 155 insertions(+), 109 deletions(-)
----------------------------------------------------------------------



[4/4] storm git commit: Added STORM-1078 to Changelog.

Posted by bo...@apache.org.
Added STORM-1078 to Changelog.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7cf4d259
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7cf4d259
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7cf4d259

Branch: refs/heads/master
Commit: 7cf4d25968b5185cbe011b7106d06fb15a77d45b
Parents: 8318343
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Sat Oct 3 08:31:48 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 3 08:31:48 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7cf4d259/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d9892eb..bc3eac1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1078: Updated RateTracker to be thread safe
  * STORM-1082: fix nits for properties in kafka tests
  * STORM-993: include uptimeSeconds as JSON integer field
  * STORM-1053: Update storm-kafka README for new producer API confs.


[2/4] storm git commit: Updated comments according to review

Posted by bo...@apache.org.
Updated comments according to review


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dad2a811
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dad2a811
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dad2a811

Branch: refs/heads/master
Commit: dad2a811f0658f39bf95611fc9657230112bd712
Parents: dbcb138
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Oct 2 09:39:51 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Oct 2 09:39:51 2015 -0500

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/utils/RateTracker.java      | 5 +++--
 storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java | 5 +++++
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dad2a811/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
index 5105c09..a490ecc 100644
--- a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
+++ b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
@@ -152,8 +152,9 @@ public class RateTracker{
         for (int i = 0; i < number; i++) {
             rt.notify(1);
             if ((i % 1000000) == 0) {
-                //There seems to be an issue with java that when we are under a very heavy load
-                // the timer thread does not get called.  This is a work around for that.
+                //There is an issue with some JVM versions where an integer for loop that takes a long time
+                // can starve other threads resulting in  the timer thread not getting called.
+                // This is a work around for that, and we still get the same results.
                 Thread.yield();
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/dad2a811/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
index 0c50b80..4f5eacb 100644
--- a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
@@ -28,6 +28,8 @@ public class RateTrackerTest extends TestCase {
 
     @Test
     public void testExactRate() {
+        //This test is in two phases.  The first phase fills up the 10 buckets with 10 tuples each
+        // We purposely simulate a 1 second bucket size so the rate will always be 10 per second.
         final long interval = 1000l;
         long time = 0l;
         RateTracker rt = new RateTracker(10000, 10, time);
@@ -40,6 +42,9 @@ public class RateTrackerTest extends TestCase {
             rt.forceRotate(1, interval);
             assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
         }
+        //In the second part of the test the rate doubles to 20 per second but the rate tracker
+        // increases its result slowly as we push the 10 tuples per second buckets out and relpace them
+        // with 20 tuples per second. 
         expected = new double[] {11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0};
         for (int i = 0; i < expected.length; i++) {
             double exp = expected[i];