You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/09/18 00:37:27 UTC

[1/6] storm git commit: Added arrival_rate metric to DisruptorQueue

Repository: storm
Updated Branches:
  refs/heads/master 7f445d7d8 -> a5a7ba181


Added arrival_rate metric to DisruptorQueue


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b8db7ad
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b8db7ad
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b8db7ad

Branch: refs/heads/master
Commit: 3b8db7adc64750a33694fad8ae7080dc83623d17
Parents: 154e9ec
Author: wangli1426 <wa...@gmail.com>
Authored: Tue Sep 1 22:42:00 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Tue Sep 1 22:42:00 2015 +0800

----------------------------------------------------------------------
 .../backtype/storm/utils/DisruptorQueue.java    |  20 +++-
 .../jvm/backtype/storm/utils/RateTracker.java   | 119 +++++++++++++++++++
 2 files changed, 135 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3b8db7ad/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index c922ca5..2ed26a5 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,11 +34,11 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.HashMap;
 import java.util.Map;
+
 import backtype.storm.metric.api.IStatefulObject;
 
 
 /**
- *
  * A single consumer queue that uses the LMAX Disruptor. They key to the performance is
  * the ability to catch up to the producer by processing tuples in batches.
  */
@@ -189,6 +189,7 @@ public class DisruptorQueue implements IStatefulObject {
         final MutableObject m = _buffer.get(id);
         m.setObject(obj);
         _buffer.publish(id);
+        _metrics.notifyArrivals(1);
     }
 
     public void consumerStarted() {
@@ -223,6 +224,8 @@ public class DisruptorQueue implements IStatefulObject {
      */
     public class QueueMetrics {
 
+        private final RateTracker _rateTracker = new RateTracker(10000, 10);
+
         public long writePos() {
             return _buffer.getCursor();
         }
@@ -249,13 +252,22 @@ public class DisruptorQueue implements IStatefulObject {
             // get readPos then writePos so it's never an under-estimate
             long rp = readPos();
             long wp = writePos();
+
+            final float arrivalRateInMils = _rateTracker.reportRate();
+
             state.put("capacity", capacity());
             state.put("population", wp - rp);
             state.put("write_pos", wp);
             state.put("read_pos", rp);
+            state.put("arrival_rate", arrivalRateInMils); //arrivals per millisecond
 
             return state;
         }
+
+        public void notifyArrivals(long counts) {
+            _rateTracker.notify(counts);
+        }
+
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/3b8db7ad/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/RateTracker.java b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
new file mode 100644
index 0000000..f937b1f
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/RateTracker.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.util.*;
+
+/**
+ * This class is a utility to track the rate.
+ */
+public class RateTracker{
+    /* number of slides to keep in the history. */
+    public final int _numOfSlides; // number of slides to keep in the history
+
+    public final int _slideSizeInMils;
+    private final long[] _histograms;// an array storing the number of element for each time slide.
+
+    private int _currentValidSlideNum;
+
+    private static Timer _timer = new Timer();
+
+    /**
+     * @param validTimeWindowInMils events that happened before validTimeWindowInMils are not considered
+     *                        when reporting the rate.
+     * @param numOfSlides the number of time sildes to divide validTimeWindows. The more slides,
+     *                    the smother the reported results will be.
+     */
+    public RateTracker(int validTimeWindowInMils, int numOfSlides) {
+        this(validTimeWindowInMils, numOfSlides, false);
+    }
+
+    /**
+     * Constructor
+     * @param validTimeWindowInMils events that happened before validTimeWindow are not considered
+     *                        when reporting the rate.
+     * @param numOfSlides the number of time sildes to divide validTimeWindows. The more slides,
+     *                    the smother the reported results will be.
+     * @param simulate set true if it use simulated time rather than system time for testing purpose.
+     */
+    public RateTracker(int validTimeWindowInMils, int numOfSlides, boolean simulate ){
+        _numOfSlides = Math.max(numOfSlides, 1);
+        _slideSizeInMils = validTimeWindowInMils / _numOfSlides;
+        if (_slideSizeInMils < 1 ) {
+            throw new IllegalArgumentException("Illeggal argument for RateTracker");
+        }
+        assert(_slideSizeInMils > 1);
+        _histograms = new long[_numOfSlides];
+        Arrays.fill(_histograms,0L);
+        if(!simulate) {
+            _timer.scheduleAtFixedRate(new Fresher(), _slideSizeInMils, _slideSizeInMils);
+        }
+        _currentValidSlideNum = 1;
+    }
+
+    /**
+     * Notify the tracker upon new arrivals
+     *
+     * @param count number of arrivals
+     */
+    public void notify(long count) {
+        _histograms[_histograms.length-1]+=count;
+    }
+
+    /**
+     * Return the average rate in slides.
+     *
+     * @return the average rate
+     */
+    public final float reportRate() {
+        long sum = 0;
+        long duration = _currentValidSlideNum * _slideSizeInMils;
+        for(int i=_numOfSlides - _currentValidSlideNum; i < _numOfSlides; i++ ){
+            sum += _histograms[i];
+        }
+
+        return sum / (float) duration * 1000;
+    }
+
+    public final void forceUpdateSlides(int numToEclipse) {
+
+        for(int i=0; i< numToEclipse; i++) {
+            updateSlides();
+        }
+
+    }
+
+    private void updateSlides(){
+
+        for (int i = 0; i < _numOfSlides - 1; i++) {
+            _histograms[i] = _histograms[i + 1];
+        }
+
+        _histograms[_histograms.length - 1] = 0;
+
+        _currentValidSlideNum = Math.min(_currentValidSlideNum + 1, _numOfSlides);
+    }
+
+    private class Fresher extends TimerTask {
+        public void run () {
+            updateSlides();
+        }
+    }
+
+
+}


[5/6] storm git commit: Merge branch 'queue-metrics' of https://github.com/wangli1426/storm into STORM-1007

Posted by ka...@apache.org.
Merge branch 'queue-metrics' of https://github.com/wangli1426/storm into STORM-1007


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ae1b8816
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ae1b8816
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ae1b8816

Branch: refs/heads/master
Commit: ae1b8816b06e99d860784a195dca68055c23b6bb
Parents: 7f445d7 f15ac94
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Sep 18 07:29:16 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 18 07:29:16 2015 +0900

----------------------------------------------------------------------
 .../backtype/storm/utils/DisruptorQueue.java    |  58 ++++++++-
 .../jvm/backtype/storm/utils/RateTracker.java   | 119 +++++++++++++++++++
 .../backtype/storm/utils/RateTrackerTest.java   |  62 ++++++++++
 3 files changed, 234 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[4/6] storm git commit: Element sojourn time, a new metric, is added to QueueMetrics

Posted by ka...@apache.org.
Element sojourn time, a new metric, is added to QueueMetrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f15ac94d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f15ac94d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f15ac94d

Branch: refs/heads/master
Commit: f15ac94d259a96ac891f5243af29b0d953ea863d
Parents: 272a9c6
Author: wangli1426 <wa...@gmail.com>
Authored: Thu Sep 3 23:00:31 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Thu Sep 3 23:00:31 2015 +0800

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/utils/DisruptorQueue.java      | 10 +++++++++-
 .../test/jvm/backtype/storm/utils/RateTrackerTest.java    |  4 ++--
 2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f15ac94d/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 4448959..c54232e 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -72,7 +72,7 @@ public class DisruptorQueue implements IStatefulObject {
         _consumer = new Sequence();
         _barrier = _buffer.newBarrier();
         _buffer.setGatingSequences(_consumer);
-        _metrics = new QueueMetrics((float)0.05);
+        _metrics = new QueueMetrics((float) 0.05);
 
         if (claim instanceof SingleThreadedClaimStrategy) {
             consumerStartedFlag = true;
@@ -277,11 +277,19 @@ public class DisruptorQueue implements IStatefulObject {
 
             final float arrivalRateInMils = _rateTracker.reportRate() / _sampleRate;
 
+            /*
+            Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
+            If this assumption does not hold, the calculation of sojourn time should also consider
+            departure rate according to Queuing Theory.
+             */
+            final float sojournTime = (wp - rp) / (float) Math.max(arrivalRateInMils, 0.00001);
+
             state.put("capacity", capacity());
             state.put("population", wp - rp);
             state.put("write_pos", wp);
             state.put("read_pos", rp);
             state.put("arrival_rate", arrivalRateInMils); //arrivals per millisecond
+            state.put("sojourn_time", sojournTime); //element sojourn time in milliseconds
 
             return state;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f15ac94d/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
index fde112e..d11bbf6 100644
--- a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.


[6/6] storm git commit: add STORM-1007 to CHANGELOG.md

Posted by ka...@apache.org.
add STORM-1007 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a5a7ba18
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a5a7ba18
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a5a7ba18

Branch: refs/heads/master
Commit: a5a7ba181017413674c4edaf09cf003dfe8c8e62
Parents: ae1b881
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Sep 18 07:37:08 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 18 07:37:08 2015 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a5a7ba18/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fc076fc..587a6a2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1007: Add more metrics to DisruptorQueue
  * STORM-1011: HBaseBolt default mapper should handle null values
  * STORM-1019: Added missing dependency version to use of org.codehaus.mojo:make-maven-plugin
  * STORM-1020: Document exceptions in ITuple & Fields


[3/6] storm git commit: Leverage sampling to reduce the overhead of QueueMetrics

Posted by ka...@apache.org.
Leverage sampling to reduce the overhead of QueueMetrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/272a9c6b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/272a9c6b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/272a9c6b

Branch: refs/heads/master
Commit: 272a9c6ba73e609e9f503638ef7258021681365c
Parents: d03caab
Author: wangli1426 <wa...@gmail.com>
Authored: Thu Sep 3 22:45:17 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Thu Sep 3 22:45:17 2015 +0800

----------------------------------------------------------------------
 .../backtype/storm/utils/DisruptorQueue.java    | 34 ++++++++++++++++++--
 1 file changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/272a9c6b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 2ed26a5..4448959 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -28,6 +28,7 @@ import com.lmax.disruptor.SequenceBarrier;
 import com.lmax.disruptor.SingleThreadedClaimStrategy;
 import com.lmax.disruptor.WaitStrategy;
 
+import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -71,7 +72,7 @@ public class DisruptorQueue implements IStatefulObject {
         _consumer = new Sequence();
         _barrier = _buffer.newBarrier();
         _buffer.setGatingSequences(_consumer);
-        _metrics = new QueueMetrics();
+        _metrics = new QueueMetrics((float)0.05);
 
         if (claim instanceof SingleThreadedClaimStrategy) {
             consumerStartedFlag = true;
@@ -225,6 +226,27 @@ public class DisruptorQueue implements IStatefulObject {
     public class QueueMetrics {
 
         private final RateTracker _rateTracker = new RateTracker(10000, 10);
+        private final float _sampleRate;
+        private Random _random;
+
+        public QueueMetrics() throws IllegalArgumentException {
+            this(1);
+        }
+
+        /**
+         * @param sampleRate a number between 0 and 1. The higher it is, the accurate the metrics
+         *                   will be. Using a reasonable sampleRate, e.g., 0.1, could effectively reduce the
+         *                   metric maintenance cost while providing good accuracy.
+         */
+        public QueueMetrics(float sampleRate) throws IllegalArgumentException {
+
+            if (sampleRate <= 0 || sampleRate > 1)
+                throw new IllegalArgumentException("sampleRate should be a value between (0,1].");
+
+            _sampleRate = sampleRate;
+
+            _random = new Random();
+        }
 
         public long writePos() {
             return _buffer.getCursor();
@@ -253,7 +275,7 @@ public class DisruptorQueue implements IStatefulObject {
             long rp = readPos();
             long wp = writePos();
 
-            final float arrivalRateInMils = _rateTracker.reportRate();
+            final float arrivalRateInMils = _rateTracker.reportRate() / _sampleRate;
 
             state.put("capacity", capacity());
             state.put("population", wp - rp);
@@ -265,9 +287,15 @@ public class DisruptorQueue implements IStatefulObject {
         }
 
         public void notifyArrivals(long counts) {
-            _rateTracker.notify(counts);
+            if (sample())
+                _rateTracker.notify(counts);
         }
 
+        final private boolean sample() {
+            if (_sampleRate == 1 || _random.nextFloat() < _sampleRate)
+                return true;
+            return false;
+        }
     }
 
 }
\ No newline at end of file


[2/6] storm git commit: Created test cases for RateTracker

Posted by ka...@apache.org.
Created test cases for RateTracker


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d03caab0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d03caab0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d03caab0

Branch: refs/heads/master
Commit: d03caab09307af5f8960f33632006b5ff18941ef
Parents: 3b8db7a
Author: wangli1426 <wa...@gmail.com>
Authored: Wed Sep 2 22:43:51 2015 +0800
Committer: wangli1426 <wa...@gmail.com>
Committed: Wed Sep 2 22:43:51 2015 +0800

----------------------------------------------------------------------
 .../backtype/storm/utils/RateTrackerTest.java   | 62 ++++++++++++++++++++
 1 file changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d03caab0/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
new file mode 100644
index 0000000..fde112e
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/RateTrackerTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+/**
+ * Unit test for RateTracker
+ */
+public class RateTrackerTest extends TestCase {
+
+    @Test
+    public void testEclipsedAllWindows() {
+        RateTracker rt = new RateTracker(10000, 10, true);
+        rt.notify(10);
+        rt.forceUpdateSlides(10);
+        assert (rt.reportRate() == 0);
+    }
+
+    @Test
+    public void testEclipsedOneWindow() {
+        RateTracker rt = new RateTracker(10000, 10, true);
+        rt.notify(1);
+        float r1 = rt.reportRate();
+        rt.forceUpdateSlides(1);
+        rt.notify(1);
+        float r2 = rt.reportRate();
+
+        System.out.format("r1:%f, r2:%f\n", r1, r2);
+
+        assert (r1 == r2);
+    }
+
+    @Test
+    public void testEclipsedNineWindows() {
+        RateTracker rt = new RateTracker(10000, 10, true);
+        rt.notify(1);
+        float r1 = rt.reportRate();
+        rt.forceUpdateSlides(9);
+        rt.notify(9);
+        float r2 = rt.reportRate();
+
+        assert (r1 == r2);
+    }
+}
\ No newline at end of file