You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/03 15:40:18 UTC
[1/4] storm git commit: Updated RateTracker to be thread safe,
more accurate,
and very fast. removed subsampling of rate calculation in disruptor,
because sub-sampling was more expensive than just doing the calculation.
Repository: storm
Updated Branches:
refs/heads/master 86ea8b218 -> 7cf4d2596
Updated RateTracker to be thread safe, more accurate, and very fast. removed subsampling of rate calculation in disruptor, because sub-sampling was more expensive than just doing the calculation.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dbcb1386
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dbcb1386
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dbcb1386
Branch: refs/heads/master
Commit: dbcb138603f55fc9f234d2d3b2404a2bfc92e4d9
Parents: ce93d5f
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Oct 1 13:00:51 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Oct 1 14:04:30 2015 -0500
----------------------------------------------------------------------
.../backtype/storm/utils/DisruptorQueue.java | 51 ++-----
.../jvm/backtype/storm/utils/RateTracker.java | 146 ++++++++++++-------
.../backtype/storm/utils/RateTrackerTest.java | 61 +++++---
3 files changed, 149 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dbcb1386/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index e0053e9..72590e5 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -76,7 +76,7 @@ public class DisruptorQueue implements IStatefulObject {
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
_buffer.setGatingSequences(_consumer);
- _metrics = new QueueMetrics((float) 0.05);
+ _metrics = new QueueMetrics();
if (claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
@@ -264,34 +264,11 @@ public class DisruptorQueue implements IStatefulObject {
return _metrics;
}
-
/**
* This inner class provides methods to access the metrics of the disruptor queue.
*/
public class QueueMetrics {
-
private final RateTracker _rateTracker = new RateTracker(10000, 10);
- private final float _sampleRate;
- private Random _random;
-
- public QueueMetrics() throws IllegalArgumentException {
- this(1);
- }
-
- /**
- * @param sampleRate a number between 0 and 1. The higher it is, the accurate the metrics
- * will be. Using a reasonable sampleRate, e.g., 0.1, could effectively reduce the
- * metric maintenance cost while providing good accuracy.
- */
- public QueueMetrics(float sampleRate) throws IllegalArgumentException {
-
- if (sampleRate <= 0 || sampleRate > 1)
- throw new IllegalArgumentException("sampleRate should be a value between (0,1].");
-
- _sampleRate = sampleRate;
-
- _random = new Random();
- }
public long writePos() {
return _buffer.getCursor();
@@ -320,35 +297,25 @@ public class DisruptorQueue implements IStatefulObject {
long rp = readPos();
long wp = writePos();
- final float arrivalRateInMils = _rateTracker.reportRate() / _sampleRate;
+ final double arrivalRateInSecs = _rateTracker.reportRate();
- /*
- Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
- If this assumption does not hold, the calculation of sojourn time should also consider
- departure rate according to Queuing Theory.
- */
- final float sojournTime = (wp - rp) / (float) Math.max(arrivalRateInMils, 0.00001);
+ //Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
+ // If this assumption does not hold, the calculation of sojourn time should also consider
+ // departure rate according to Queuing Theory.
+ final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
state.put("capacity", capacity());
state.put("population", wp - rp);
state.put("write_pos", wp);
state.put("read_pos", rp);
- state.put("arrival_rate", arrivalRateInMils); //arrivals per millisecond
- state.put("sojourn_time", sojournTime); //element sojourn time in milliseconds
+ state.put("arrival_rate_secs", arrivalRateInSecs);
+ state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
return state;
}
public void notifyArrivals(long counts) {
- if (sample())
- _rateTracker.notify(counts);
- }
-
- final private boolean sample() {
- if (_sampleRate == 1 || _random.nextFloat() < _sampleRate)
- return true;
- return false;
+ _rateTracker.notify(counts);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/dbcb1386/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
index f937b1f..5105c09 100644
--- a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
+++ b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
@@ -17,53 +17,61 @@
*/
package backtype.storm.utils;
-import java.util.*;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * This class is a utility to track the rate.
+ * This class is a utility to track the rate of something.
*/
public class RateTracker{
- /* number of slides to keep in the history. */
- public final int _numOfSlides; // number of slides to keep in the history
-
- public final int _slideSizeInMils;
- private final long[] _histograms;// an array storing the number of element for each time slide.
-
- private int _currentValidSlideNum;
-
- private static Timer _timer = new Timer();
+ private final int _bucketSizeMillis;
+ //Old Buckets and their length are only touched when rotating or gathering the metrics, which should not be that frequent
+ // As such all access to them should be protected by synchronizing with the RateTracker instance
+ private final long[] _bucketTime;
+ private final long[] _oldBuckets;
+
+ private final AtomicLong _bucketStart;
+ private final AtomicLong _currentBucket;
+
+ private final TimerTask _task;
+ private static Timer _timer = new Timer("rate tracker timer", true);
/**
* @param validTimeWindowInMils events that happened before validTimeWindowInMils are not considered
* when reporting the rate.
- * @param numOfSlides the number of time sildes to divide validTimeWindows. The more slides,
+ * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
* the smother the reported results will be.
*/
- public RateTracker(int validTimeWindowInMils, int numOfSlides) {
- this(validTimeWindowInMils, numOfSlides, false);
+ public RateTracker(int validTimeWindowInMils, int numBuckets) {
+ this(validTimeWindowInMils, numBuckets, -1);
}
/**
* Constructor
* @param validTimeWindowInMils events that happened before validTimeWindow are not considered
* when reporting the rate.
- * @param numOfSlides the number of time sildes to divide validTimeWindows. The more slides,
+ * @param numBuckets the number of time sildes to divide validTimeWindows. The more buckets,
* the smother the reported results will be.
- * @param simulate set true if it use simulated time rather than system time for testing purpose.
+ * @param startTime if positive the simulated time to start the first bucket at.
*/
- public RateTracker(int validTimeWindowInMils, int numOfSlides, boolean simulate ){
- _numOfSlides = Math.max(numOfSlides, 1);
- _slideSizeInMils = validTimeWindowInMils / _numOfSlides;
- if (_slideSizeInMils < 1 ) {
- throw new IllegalArgumentException("Illeggal argument for RateTracker");
+ RateTracker(int validTimeWindowInMils, int numBuckets, long startTime){
+ numBuckets = Math.max(numBuckets, 1);
+ _bucketSizeMillis = validTimeWindowInMils / numBuckets;
+ if (_bucketSizeMillis < 1 ) {
+ throw new IllegalArgumentException("validTimeWindowInMilis and numOfSildes cause each slide to have a window that is too small");
}
- assert(_slideSizeInMils > 1);
- _histograms = new long[_numOfSlides];
- Arrays.fill(_histograms,0L);
- if(!simulate) {
- _timer.scheduleAtFixedRate(new Fresher(), _slideSizeInMils, _slideSizeInMils);
+ _bucketTime = new long[numBuckets - 1];
+ _oldBuckets = new long[numBuckets - 1];
+
+ _bucketStart = new AtomicLong(startTime >= 0 ? startTime : System.currentTimeMillis());
+ _currentBucket = new AtomicLong(0);
+ if (startTime < 0) {
+ _task = new Fresher();
+ _timer.scheduleAtFixedRate(_task, _bucketSizeMillis, _bucketSizeMillis);
+ } else {
+ _task = null;
}
- _currentValidSlideNum = 1;
}
/**
@@ -72,48 +80,86 @@ public class RateTracker{
* @param count number of arrivals
*/
public void notify(long count) {
- _histograms[_histograms.length-1]+=count;
+ _currentBucket.addAndGet(count);
}
/**
- * Return the average rate in slides.
- *
- * @return the average rate
+ * @return the approximate average rate per second.
*/
- public final float reportRate() {
- long sum = 0;
- long duration = _currentValidSlideNum * _slideSizeInMils;
- for(int i=_numOfSlides - _currentValidSlideNum; i < _numOfSlides; i++ ){
- sum += _histograms[i];
- }
-
- return sum / (float) duration * 1000;
+ public synchronized double reportRate() {
+ return reportRate(System.currentTimeMillis());
}
- public final void forceUpdateSlides(int numToEclipse) {
-
- for(int i=0; i< numToEclipse; i++) {
- updateSlides();
+ synchronized double reportRate(long currentTime) {
+ long duration = Math.max(1l, currentTime - _bucketStart.get());
+ long events = _currentBucket.get();
+ for (int i = 0; i < _oldBuckets.length; i++) {
+ events += _oldBuckets[i];
+ duration += _bucketTime[i];
}
+ return events * 1000.0 / duration;
}
- private void updateSlides(){
-
- for (int i = 0; i < _numOfSlides - 1; i++) {
- _histograms[i] = _histograms[i + 1];
+ public void close() {
+ if (_task != null) {
+ _task.cancel();
}
+ }
- _histograms[_histograms.length - 1] = 0;
+ /**
+ * Rotate the buckets a set number of times for testing purposes.
+ * @param numToEclipse the number of rotations to perform.
+ */
+ final void forceRotate(int numToEclipse, long interval) {
+ long time = _bucketStart.get();
+ for (int i = 0; i < numToEclipse; i++) {
+ time += interval;
+ rotateBuckets(time);
+ }
+ }
- _currentValidSlideNum = Math.min(_currentValidSlideNum + 1, _numOfSlides);
+ private synchronized void rotateBuckets(long time) {
+ long timeSpent = time - _bucketStart.getAndSet(time);
+ long currentVal = _currentBucket.getAndSet(0);
+ for (int i = 0; i < _oldBuckets.length; i++) {
+ long tmpTime = _bucketTime[i];
+ _bucketTime[i] = timeSpent;
+ timeSpent = tmpTime;
+
+ long cnt = _oldBuckets[i];
+ _oldBuckets[i] = currentVal;
+ currentVal = cnt;
+ }
}
private class Fresher extends TimerTask {
public void run () {
- updateSlides();
+ rotateBuckets(System.currentTimeMillis());
}
}
+ public static void main (String args[]) throws Exception {
+ final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 100000000;
+ for (int i = 0; i < 10; i++) {
+ testRate(number);
+ }
+ }
+ private static void testRate(int number) {
+ RateTracker rt = new RateTracker(10000, 10);
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < number; i++) {
+ rt.notify(1);
+ if ((i % 1000000) == 0) {
+ //There seems to be an issue with java that when we are under a very heavy load
+ // the timer thread does not get called. This is a work around for that.
+ Thread.yield();
+ }
+ }
+ long end = System.currentTimeMillis();
+ double rate = rt.reportRate();
+ rt.close();
+ System.out.printf("time %,8d count %,8d rate %,15.2f reported rate %,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/dbcb1386/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
index d11bbf6..0c50b80 100644
--- a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
@@ -17,9 +17,9 @@
*/
package backtype.storm.utils;
-import org.junit.Assert;
import org.junit.Test;
import junit.framework.TestCase;
+import static org.junit.Assert.*;
/**
* Unit test for RateTracker
@@ -27,36 +27,63 @@ import junit.framework.TestCase;
public class RateTrackerTest extends TestCase {
@Test
+ public void testExactRate() {
+ final long interval = 1000l;
+ long time = 0l;
+ RateTracker rt = new RateTracker(10000, 10, time);
+ double [] expected = new double[] {10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0};
+ for (int i = 0; i < expected.length; i++) {
+ double exp = expected[i];
+ rt.notify(10);
+ time += interval;
+ double actual = rt.reportRate(time);
+ rt.forceRotate(1, interval);
+ assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
+ }
+ expected = new double[] {11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0};
+ for (int i = 0; i < expected.length; i++) {
+ double exp = expected[i];
+ rt.notify(20);
+ time += interval;
+ double actual = rt.reportRate(time);
+ rt.forceRotate(1, interval);
+ assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
+ }
+ }
+
+
+ @Test
public void testEclipsedAllWindows() {
- RateTracker rt = new RateTracker(10000, 10, true);
+ long time = 0;
+ RateTracker rt = new RateTracker(10000, 10, time);
rt.notify(10);
- rt.forceUpdateSlides(10);
- assert (rt.reportRate() == 0);
+ rt.forceRotate(10, 1000l);
+ assertEquals(0.0, rt.reportRate(10000l), 0.00001);
}
@Test
public void testEclipsedOneWindow() {
- RateTracker rt = new RateTracker(10000, 10, true);
+ long time = 0;
+ RateTracker rt = new RateTracker(10000, 10, time);
rt.notify(1);
- float r1 = rt.reportRate();
- rt.forceUpdateSlides(1);
+ double r1 = rt.reportRate(1000l);
+ rt.forceRotate(1, 1000l);
rt.notify(1);
- float r2 = rt.reportRate();
-
- System.out.format("r1:%f, r2:%f\n", r1, r2);
+ double r2 = rt.reportRate(2000l);
- assert (r1 == r2);
+ assertEquals(r1, r2, 0.00001);
}
@Test
public void testEclipsedNineWindows() {
- RateTracker rt = new RateTracker(10000, 10, true);
+ long time = 0;
+ RateTracker rt = new RateTracker(10000, 10, time);
rt.notify(1);
- float r1 = rt.reportRate();
- rt.forceUpdateSlides(9);
+ double r1 = rt.reportRate(1000);
+ rt.forceRotate(9, 1000);
rt.notify(9);
- float r2 = rt.reportRate();
+ double r2 = rt.reportRate(10000);
- assert (r1 == r2);
+ assertEquals(r1, r2, 0.00001);
}
-}
\ No newline at end of file
+}
[3/4] storm git commit: Merge branch 'STORM-1078' of
https://github.com/revans2/incubator-storm into STORM-1078
Posted by bo...@apache.org.
Merge branch 'STORM-1078' of https://github.com/revans2/incubator-storm into STORM-1078
STORM-1078: Updated RateTracker to be thread safe
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/83183433
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/83183433
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/83183433
Branch: refs/heads/master
Commit: 831834338266275780b4479b3afce2a162f62974
Parents: 86ea8b2 dad2a81
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Sat Oct 3 08:31:22 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 3 08:31:22 2015 -0500
----------------------------------------------------------------------
.../backtype/storm/utils/DisruptorQueue.java | 51 ++-----
.../jvm/backtype/storm/utils/RateTracker.java | 147 ++++++++++++-------
.../backtype/storm/utils/RateTrackerTest.java | 66 ++++++---
3 files changed, 155 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
[4/4] storm git commit: Added STORM-1078 to Changelog.
Posted by bo...@apache.org.
Added STORM-1078 to Changelog.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7cf4d259
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7cf4d259
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7cf4d259
Branch: refs/heads/master
Commit: 7cf4d25968b5185cbe011b7106d06fb15a77d45b
Parents: 8318343
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Sat Oct 3 08:31:48 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Sat Oct 3 08:31:48 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7cf4d259/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d9892eb..bc3eac1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1078: Updated RateTracker to be thread safe
* STORM-1082: fix nits for properties in kafka tests
* STORM-993: include uptimeSeconds as JSON integer field
* STORM-1053: Update storm-kafka README for new producer API confs.
[2/4] storm git commit: Updated comments according to review
Posted by bo...@apache.org.
Updated comments according to review
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dad2a811
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dad2a811
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dad2a811
Branch: refs/heads/master
Commit: dad2a811f0658f39bf95611fc9657230112bd712
Parents: dbcb138
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Oct 2 09:39:51 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Oct 2 09:39:51 2015 -0500
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/utils/RateTracker.java | 5 +++--
storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java | 5 +++++
2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dad2a811/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
index 5105c09..a490ecc 100644
--- a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
+++ b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
@@ -152,8 +152,9 @@ public class RateTracker{
for (int i = 0; i < number; i++) {
rt.notify(1);
if ((i % 1000000) == 0) {
- //There seems to be an issue with java that when we are under a very heavy load
- // the timer thread does not get called. This is a work around for that.
+ //There is an issue with some JVM versions where an integer for loop that takes a long time
+ // can starve other threads resulting in the timer thread not getting called.
+ // This is a work around for that, and we still get the same results.
Thread.yield();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/dad2a811/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
index 0c50b80..4f5eacb 100644
--- a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
@@ -28,6 +28,8 @@ public class RateTrackerTest extends TestCase {
@Test
public void testExactRate() {
+ //This test is in two phases. The first phase fills up the 10 buckets with 10 tuples each
+ // We purposely simulate a 1 second bucket size so the rate will always be 10 per second.
final long interval = 1000l;
long time = 0l;
RateTracker rt = new RateTracker(10000, 10, time);
@@ -40,6 +42,9 @@ public class RateTrackerTest extends TestCase {
rt.forceRotate(1, interval);
assertEquals("Expected rate on iteration "+i+" is wrong.", exp, actual, 0.00001);
}
+ //In the second part of the test the rate doubles to 20 per second but the rate tracker
+ // increases its result slowly as we push the 10 tuples per second buckets out and relpace them
+ // with 20 tuples per second.
expected = new double[] {11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0};
for (int i = 0; i < expected.length; i++) {
double exp = expected[i];