You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/18 00:37:27 UTC
[1/6] storm git commit: Added arrival_rate metric to DisruptorQueue
Repository: storm
Updated Branches:
refs/heads/master 7f445d7d8 -> a5a7ba181
Added arrival_rate metric to DisruptorQueue
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b8db7ad
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b8db7ad
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b8db7ad
Branch: refs/heads/master
Commit: 3b8db7adc64750a33694fad8ae7080dc83623d17
Parents: 154e9ec
Author: wangli1426 <wa...@gmail.com>
Authored: Tue Sep 1 22:42:00 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Tue Sep 1 22:42:00 2015 +0800
----------------------------------------------------------------------
.../backtype/storm/utils/DisruptorQueue.java | 20 +++-
.../jvm/backtype/storm/utils/RateTracker.java | 119 +++++++++++++++++++
2 files changed, 135 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3b8db7ad/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index c922ca5..2ed26a5 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p/>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,11 +34,11 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.HashMap;
import java.util.Map;
+
import backtype.storm.metric.api.IStatefulObject;
/**
- *
* A single consumer queue that uses the LMAX Disruptor. They key to the performance is
* the ability to catch up to the producer by processing tuples in batches.
*/
@@ -189,6 +189,7 @@ public class DisruptorQueue implements IStatefulObject {
final MutableObject m = _buffer.get(id);
m.setObject(obj);
_buffer.publish(id);
+ _metrics.notifyArrivals(1);
}
public void consumerStarted() {
@@ -223,6 +224,8 @@ public class DisruptorQueue implements IStatefulObject {
*/
public class QueueMetrics {
+ private final RateTracker _rateTracker = new RateTracker(10000, 10);
+
public long writePos() {
return _buffer.getCursor();
}
@@ -249,13 +252,22 @@ public class DisruptorQueue implements IStatefulObject {
// get readPos then writePos so it's never an under-estimate
long rp = readPos();
long wp = writePos();
+
+ final float arrivalRateInMils = _rateTracker.reportRate();
+
state.put("capacity", capacity());
state.put("population", wp - rp);
state.put("write_pos", wp);
state.put("read_pos", rp);
+ state.put("arrival_rate", arrivalRateInMils); //arrivals per millisecond
return state;
}
+
+ public void notifyArrivals(long counts) {
+ _rateTracker.notify(counts);
+ }
+
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/3b8db7ad/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
new file mode 100644
index 0000000..f937b1f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.util.*;
+
+/**
+ * This class is a utility to track the rate.
+ */
+public class RateTracker{
+ /* number of slides to keep in the history. */
+ public final int _numOfSlides; // number of slides to keep in the history
+
+ public final int _slideSizeInMils;
+ private final long[] _histograms;// an array storing the number of element for each time slide.
+
+ private int _currentValidSlideNum;
+
+ private static Timer _timer = new Timer();
+
+ /**
+ * @param validTimeWindowInMils events that happened before validTimeWindowInMils are not considered
+ * when reporting the rate.
+ * @param numOfSlides the number of time sildes to divide validTimeWindows. The more slides,
+ * the smother the reported results will be.
+ */
+ public RateTracker(int validTimeWindowInMils, int numOfSlides) {
+ this(validTimeWindowInMils, numOfSlides, false);
+ }
+
+ /**
+ * Constructor
+ * @param validTimeWindowInMils events that happened before validTimeWindow are not considered
+ * when reporting the rate.
+ * @param numOfSlides the number of time sildes to divide validTimeWindows. The more slides,
+ * the smother the reported results will be.
+ * @param simulate set true if it use simulated time rather than system time for testing purpose.
+ */
+ public RateTracker(int validTimeWindowInMils, int numOfSlides, boolean simulate ){
+ _numOfSlides = Math.max(numOfSlides, 1);
+ _slideSizeInMils = validTimeWindowInMils / _numOfSlides;
+ if (_slideSizeInMils < 1 ) {
+ throw new IllegalArgumentException("Illeggal argument for RateTracker");
+ }
+ assert(_slideSizeInMils > 1);
+ _histograms = new long[_numOfSlides];
+ Arrays.fill(_histograms,0L);
+ if(!simulate) {
+ _timer.scheduleAtFixedRate(new Fresher(), _slideSizeInMils, _slideSizeInMils);
+ }
+ _currentValidSlideNum = 1;
+ }
+
+ /**
+ * Notify the tracker upon new arrivals
+ *
+ * @param count number of arrivals
+ */
+ public void notify(long count) {
+ _histograms[_histograms.length-1]+=count;
+ }
+
+ /**
+ * Return the average rate in slides.
+ *
+ * @return the average rate
+ */
+ public final float reportRate() {
+ long sum = 0;
+ long duration = _currentValidSlideNum * _slideSizeInMils;
+ for(int i=_numOfSlides - _currentValidSlideNum; i < _numOfSlides; i++ ){
+ sum += _histograms[i];
+ }
+
+ return sum / (float) duration * 1000;
+ }
+
+ public final void forceUpdateSlides(int numToEclipse) {
+
+ for(int i=0; i< numToEclipse; i++) {
+ updateSlides();
+ }
+
+ }
+
+ private void updateSlides(){
+
+ for (int i = 0; i < _numOfSlides - 1; i++) {
+ _histograms[i] = _histograms[i + 1];
+ }
+
+ _histograms[_histograms.length - 1] = 0;
+
+ _currentValidSlideNum = Math.min(_currentValidSlideNum + 1, _numOfSlides);
+ }
+
+ private class Fresher extends TimerTask {
+ public void run () {
+ updateSlides();
+ }
+ }
+
+
+}
[5/6] storm git commit: Merge branch 'queue-metrics' of
https://github.com/wangli1426/storm into STORM-1007
Posted by ka...@apache.org.
Merge branch 'queue-metrics' of https://github.com/wangli1426/storm into STORM-1007
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ae1b8816
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ae1b8816
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ae1b8816
Branch: refs/heads/master
Commit: ae1b8816b06e99d860784a195dca68055c23b6bb
Parents: 7f445d7 f15ac94
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Sep 18 07:29:16 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 18 07:29:16 2015 +0900
----------------------------------------------------------------------
.../backtype/storm/utils/DisruptorQueue.java | 58 ++++++++-
.../jvm/backtype/storm/utils/RateTracker.java | 119 +++++++++++++++++++
.../backtype/storm/utils/RateTrackerTest.java | 62 ++++++++++
3 files changed, 234 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[4/6] storm git commit: Element sojourn time, a new metric,
is added to QueueMetrics
Posted by ka...@apache.org.
Element sojourn time, a new metric, is added to QueueMetrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f15ac94d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f15ac94d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f15ac94d
Branch: refs/heads/master
Commit: f15ac94d259a96ac891f5243af29b0d953ea863d
Parents: 272a9c6
Author: wangli1426 <wa...@gmail.com>
Authored: Thu Sep 3 23:00:31 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Thu Sep 3 23:00:31 2015 +0800
----------------------------------------------------------------------
.../src/jvm/backtype/storm/utils/DisruptorQueue.java | 10 +++++++++-
.../test/jvm/backtype/storm/utils/RateTrackerTest.java | 4 ++--
2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f15ac94d/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 4448959..c54232e 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -72,7 +72,7 @@ public class DisruptorQueue implements IStatefulObject {
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
_buffer.setGatingSequences(_consumer);
- _metrics = new QueueMetrics((float)0.05);
+ _metrics = new QueueMetrics((float) 0.05);
if (claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
@@ -277,11 +277,19 @@ public class DisruptorQueue implements IStatefulObject {
final float arrivalRateInMils = _rateTracker.reportRate() / _sampleRate;
+ /*
+ Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
+ If this assumption does not hold, the calculation of sojourn time should also consider
+ departure rate according to Queuing Theory.
+ */
+ final float sojournTime = (wp - rp) / (float) Math.max(arrivalRateInMils, 0.00001);
+
state.put("capacity", capacity());
state.put("population", wp - rp);
state.put("write_pos", wp);
state.put("read_pos", rp);
state.put("arrival_rate", arrivalRateInMils); //arrivals per millisecond
+ state.put("sojourn_time", sojournTime); //element sojourn time in milliseconds
return state;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f15ac94d/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
index fde112e..d11bbf6 100644
--- a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p/>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
[6/6] storm git commit: add STORM-1007 to CHANGELOG.md
Posted by ka...@apache.org.
add STORM-1007 to CHANGELOG.md
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a5a7ba18
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a5a7ba18
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a5a7ba18
Branch: refs/heads/master
Commit: a5a7ba181017413674c4edaf09cf003dfe8c8e62
Parents: ae1b881
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Sep 18 07:37:08 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 18 07:37:08 2015 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a5a7ba18/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fc076fc..587a6a2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1007: Add more metrics to DisruptorQueue
* STORM-1011: HBaseBolt default mapper should handle null values
* STORM-1019: Added missing dependency version to use of org.codehaus.mojo:make-maven-plugin
* STORM-1020: Document exceptions in ITuple & Fields
[3/6] storm git commit: Leverage sampling to reduce the overhead of
QueueMetrics
Posted by ka...@apache.org.
Leverage sampling to reduce the overhead of QueueMetrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/272a9c6b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/272a9c6b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/272a9c6b
Branch: refs/heads/master
Commit: 272a9c6ba73e609e9f503638ef7258021681365c
Parents: d03caab
Author: wangli1426 <wa...@gmail.com>
Authored: Thu Sep 3 22:45:17 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Thu Sep 3 22:45:17 2015 +0800
----------------------------------------------------------------------
.../backtype/storm/utils/DisruptorQueue.java | 34 ++++++++++++++++++--
1 file changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/272a9c6b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 2ed26a5..4448959 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -28,6 +28,7 @@ import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WaitStrategy;
+import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -71,7 +72,7 @@ public class DisruptorQueue implements IStatefulObject {
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
_buffer.setGatingSequences(_consumer);
- _metrics = new QueueMetrics();
+ _metrics = new QueueMetrics((float)0.05);
if (claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
@@ -225,6 +226,27 @@ public class DisruptorQueue implements IStatefulObject {
public class QueueMetrics {
private final RateTracker _rateTracker = new RateTracker(10000, 10);
+ private final float _sampleRate;
+ private Random _random;
+
+ public QueueMetrics() throws IllegalArgumentException {
+ this(1);
+ }
+
+ /**
+ * @param sampleRate a number between 0 and 1. The higher it is, the accurate the metrics
+ * will be. Using a reasonable sampleRate, e.g., 0.1, could effectively reduce the
+ * metric maintenance cost while providing good accuracy.
+ */
+ public QueueMetrics(float sampleRate) throws IllegalArgumentException {
+
+ if (sampleRate <= 0 || sampleRate > 1)
+ throw new IllegalArgumentException("sampleRate should be a value between (0,1].");
+
+ _sampleRate = sampleRate;
+
+ _random = new Random();
+ }
public long writePos() {
return _buffer.getCursor();
@@ -253,7 +275,7 @@ public class DisruptorQueue implements IStatefulObject {
long rp = readPos();
long wp = writePos();
- final float arrivalRateInMils = _rateTracker.reportRate();
+ final float arrivalRateInMils = _rateTracker.reportRate() / _sampleRate;
state.put("capacity", capacity());
state.put("population", wp - rp);
@@ -265,9 +287,15 @@ public class DisruptorQueue implements IStatefulObject {
}
public void notifyArrivals(long counts) {
- _rateTracker.notify(counts);
+ if (sample())
+ _rateTracker.notify(counts);
}
+ final private boolean sample() {
+ if (_sampleRate == 1 || _random.nextFloat() < _sampleRate)
+ return true;
+ return false;
+ }
}
}
\ No newline at end of file
[2/6] storm git commit: Created test cases for RateTracker
Posted by ka...@apache.org.
Created test cases for RateTracker
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d03caab0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d03caab0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d03caab0
Branch: refs/heads/master
Commit: d03caab09307af5f8960f33632006b5ff18941ef
Parents: 3b8db7a
Author: wangli1426 <wa...@gmail.com>
Authored: Wed Sep 2 22:43:51 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Wed Sep 2 22:43:51 2015 +0800
----------------------------------------------------------------------
.../backtype/storm/utils/RateTrackerTest.java | 62 ++++++++++++++++++++
1 file changed, 62 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d03caab0/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
new file mode 100644
index 0000000..fde112e
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+/**
+ * Unit test for RateTracker
+ */
+public class RateTrackerTest extends TestCase {
+
+ @Test
+ public void testEclipsedAllWindows() {
+ RateTracker rt = new RateTracker(10000, 10, true);
+ rt.notify(10);
+ rt.forceUpdateSlides(10);
+ assert (rt.reportRate() == 0);
+ }
+
+ @Test
+ public void testEclipsedOneWindow() {
+ RateTracker rt = new RateTracker(10000, 10, true);
+ rt.notify(1);
+ float r1 = rt.reportRate();
+ rt.forceUpdateSlides(1);
+ rt.notify(1);
+ float r2 = rt.reportRate();
+
+ System.out.format("r1:%f, r2:%f\n", r1, r2);
+
+ assert (r1 == r2);
+ }
+
+ @Test
+ public void testEclipsedNineWindows() {
+ RateTracker rt = new RateTracker(10000, 10, true);
+ rt.notify(1);
+ float r1 = rt.reportRate();
+ rt.forceUpdateSlides(9);
+ rt.notify(9);
+ float r2 = rt.reportRate();
+
+ assert (r1 == r2);
+ }
+}
\ No newline at end of file